{
  "nbformat": 4,
  "nbformat_minor": 0,
  "metadata": {
    "colab": {
      "name": "hw10_adversarial_attack.ipynb",
      "provenance": [],
      "collapsed_sections": [],
      "toc_visible": true
    },
    "kernelspec": {
      "name": "python3",
      "display_name": "Python 3"
    },
    "accelerator": "GPU"
  },
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Q-n2e0BkhEKS"
      },
      "source": [
        "# **Homework 10 - Adversarial Attack**\n",
        "\n",
        "Slides: https://reurl.cc/v5kXkk\n",
        "\n",
        "Video(En): https://youtu.be/313TZsUDQ48\n",
        "\n",
        "Video(Zh): https://youtu.be/xWHpPEvkDiE\n",
        "\n",
        "TA: ntu-ml-2021spring-ta@googlegroups.com"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "9RX7iRXrhMA_"
      },
      "source": [
        "## Enviroment & Download\n",
        "\n",
        "We make use of [pytorchcv](https://pypi.org/project/pytorchcv/) to obtain CIFAR-10 pretrained model, so we need to set up the enviroment first. We also need to download the data (200 images) which we want to attack."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "d4Lw7urignqP"
      },
      "source": [
        "# set up environment\n",
        "!pip install pytorchcv\n",
        "\n",
        "# download\n",
        "!gdown --id 1fHi1ko7wr80wXkXpqpqpOxuYH1mClXoX -O data.zip\n",
        "\n",
        "# unzip\n",
        "!unzip ./data.zip\n",
        "!rm ./data.zip"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "hkQQf0l1hbBs"
      },
      "source": [
        "## Global Settings\n",
        "\n",
        "* $\\epsilon$ is fixed to be 8. But on **Data section**, we will first apply transforms on raw pixel value (0-255 scale) **by ToTensor (to 0-1 scale)** and then **Normalize (subtract mean divide std)**. $\\epsilon$ should be set to $\\frac{8}{255 * std}$ during attack.\n",
        "\n",
        "* Explaination (optional)\n",
        "    * Denote the first pixel of original image as $p$, and the first pixel of adversarial image as $a$.\n",
        "    * The $\\epsilon$ constraints tell us $\\left| p-a \\right| <= 8$.\n",
        "    * ToTensor() can be seen as a function where $T(x) = x/255$.\n",
        "    * Normalize() can be seen as a function where $N(x) = (x-mean)/std$ where $mean$ and $std$ are constants.\n",
        "    * After applying ToTensor() and Normalize() on $p$ and $a$, the constraint becomes $\\left| N(T(p))-N(T(a)) \\right| = \\left| \\frac{\\frac{p}{255}-mean}{std}-\\frac{\\frac{a}{255}-mean}{std} \\right| = \\frac{1}{255 * std} \\left| p-a \\right| <= \\frac{8}{255 * std}.$\n",
        "    * So, we should set $\\epsilon$ to be $\\frac{8}{255 * std}$ after ToTensor() and Normalize()."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "ACghc_tsg2vE"
      },
      "source": [
        "import torch\n",
        "import torch.nn as nn\n",
        "\n",
        "device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')\n",
        "\n",
        "batch_size = 8\n",
        "\n",
        "# the mean and std are the calculated statistics from cifar_10 dataset\n",
        "cifar_10_mean = (0.491, 0.482, 0.447) # mean for the three channels of cifar_10 images\n",
        "cifar_10_std = (0.202, 0.199, 0.201) # std for the three channels of cifar_10 images\n",
        "\n",
        "# convert mean and std to 3-dimensional tensors for future operations\n",
        "mean = torch.tensor(cifar_10_mean).to(device).view(3, 1, 1)\n",
        "std = torch.tensor(cifar_10_std).to(device).view(3, 1, 1)\n",
        "\n",
        "epsilon = 8/255/std\n",
        "# TODO: iterative fgsm attack\n",
        "# alpha (step size) can be decided by yourself\n",
        "alpha = 0.8/255/std\n",
        "\n",
        "root = './data' # directory for storing benign images\n",
        "# benign images: images which do not contain adversarial perturbations\n",
        "# adversarial images: images which include adversarial perturbations"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "lhBJBAlKherZ"
      },
      "source": [
        "## Data\n",
        "\n",
        "Construct dataset and dataloader from root directory. Note that we store the filename of each image for future usage."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "VXpRAHz0hkDt"
      },
      "source": [
        "import os\n",
        "import glob\n",
        "import shutil\n",
        "import numpy as np\n",
        "from PIL import Image\n",
        "from torchvision.transforms import transforms\n",
        "from torch.utils.data import Dataset, DataLoader\n",
        "\n",
        "transform = transforms.Compose([\n",
        "    transforms.ToTensor(),\n",
        "    transforms.Normalize(cifar_10_mean, cifar_10_std)\n",
        "])\n",
        "\n",
        "class AdvDataset(Dataset):\n",
        "    def __init__(self, data_dir, transform):\n",
        "        self.images = []\n",
        "        self.labels = []\n",
        "        self.names = []\n",
        "        '''\n",
        "        data_dir\n",
        "        ├── class_dir\n",
        "        │   ├── class1.png\n",
        "        │   ├── ...\n",
        "        │   ├── class20.png\n",
        "        '''\n",
        "        for i, class_dir in enumerate(sorted(glob.glob(f'{data_dir}/*'))):\n",
        "            images = sorted(glob.glob(f'{class_dir}/*'))\n",
        "            self.images += images\n",
        "            self.labels += ([i] * len(images))\n",
        "            self.names += [os.path.relpath(imgs, data_dir) for imgs in images]\n",
        "        self.transform = transform\n",
        "    def __getitem__(self, idx):\n",
        "        image = self.transform(Image.open(self.images[idx]))\n",
        "        label = self.labels[idx]\n",
        "        return image, label\n",
        "    def __getname__(self):\n",
        "        return self.names\n",
        "    def __len__(self):\n",
        "        return len(self.images)\n",
        "\n",
        "adv_set = AdvDataset(root, transform=transform)\n",
        "adv_names = adv_set.__getname__()\n",
        "adv_loader = DataLoader(adv_set, batch_size=batch_size, shuffle=False)\n",
        "\n",
        "print(f'number of images = {adv_set.__len__()}')"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "LnszlTsYrTQZ"
      },
      "source": [
        "## Utils -- Benign Images Evaluation"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "5c_zZLzkrceE"
      },
      "source": [
        "# to evaluate the performance of model on benign images\n",
        "def epoch_benign(model, loader, loss_fn):\n",
        "    model.eval()\n",
        "    train_acc, train_loss = 0.0, 0.0\n",
        "    for x, y in loader:\n",
        "        x, y = x.to(device), y.to(device)\n",
        "        yp = model(x)\n",
        "        loss = loss_fn(yp, y)\n",
        "        train_acc += (yp.argmax(dim=1) == y).sum().item()\n",
        "        train_loss += loss.item() * x.shape[0]\n",
        "    return train_acc / len(loader.dataset), train_loss / len(loader.dataset)"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "_YJxK7YehqQy"
      },
      "source": [
        "## Utils -- Attack Algorithm"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "F_1wKfKyhrQW"
      },
      "source": [
        "# perform fgsm attack\n",
        "def fgsm(model, x, y, loss_fn, epsilon=epsilon):\n",
        "    x_adv = x.detach().clone() # initialize x_adv as original benign image x\n",
        "    x_adv.requires_grad = True # need to obtain gradient of x_adv, thus set required grad\n",
        "    loss = loss_fn(model(x_adv), y) # calculate loss\n",
        "    loss.backward() # calculate gradient\n",
        "    # fgsm: use gradient ascent on x_adv to maximize loss\n",
        "    x_adv = x_adv + epsilon * x_adv.grad.detach().sign()\n",
        "    return x_adv\n",
        "\n",
        "# TODO: perform iterative fgsm attack\n",
        "# set alpha as the step size in Global Settings section\n",
        "# alpha and num_iter can be decided by yourself\n",
        "def ifgsm(model, x, y, loss_fn, epsilon=epsilon, alpha=alpha, num_iter=20):\n",
        "    # initialize x_adv as original benign image x\n",
        "    # write a loop of num_iter to represent the iterative times\n",
        "    # for each loop\n",
        "        # call fgsm with (epsilon = alpha) to obtain new x_adv\n",
        "        # clip new x_adv back to [x-epsilon, x+epsilon]\n",
        "    # return x_adv\n",
        "    pass"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "fYCEQwmcrmH6"
      },
      "source": [
        "## Utils -- Attack\n",
        "\n",
        "* Recall\n",
        "    * ToTensor() can be seen as a function where $T(x) = x/255$.\n",
        "    * Normalize() can be seen as a function where $N(x) = (x-mean)/std$ where $mean$ and $std$ are constants.\n",
        "\n",
        "* Inverse function\n",
        "    * Inverse Normalize() can be seen as a function where $N^{-1}(x) = x*std+mean$ where $mean$ and $std$ are constants.\n",
        "    * Inverse ToTensor() can be seen as a function where $T^{-1}(x) = x*255$.\n",
        "\n",
        "* Special Noted\n",
        "    * ToTensor() will also convert the image from shape (height, width, channel) to shape (channel, height, width), so we also need to transpose the shape back to original shape.\n",
        "    * Since our dataloader samples a batch of data, what we need here is to transpose **(batch_size, channel, height, width)** back to **(batch_size, height, width, channel)** using np.transpose."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "w5X_9x-7ro_w"
      },
      "source": [
        "# perform adversarial attack and generate adversarial examples\n",
        "def gen_adv_examples(model, loader, attack, loss_fn):\n",
        "    model.eval()\n",
        "    adv_names = []\n",
        "    train_acc, train_loss = 0.0, 0.0\n",
        "    for i, (x, y) in enumerate(loader):\n",
        "        x, y = x.to(device), y.to(device)\n",
        "        x_adv = attack(model, x, y, loss_fn) # obtain adversarial examples\n",
        "        yp = model(x_adv)\n",
        "        loss = loss_fn(yp, y)\n",
        "        train_acc += (yp.argmax(dim=1) == y).sum().item()\n",
        "        train_loss += loss.item() * x.shape[0]\n",
        "        # store adversarial examples\n",
        "        adv_ex = ((x_adv) * std + mean).clamp(0, 1) # to 0-1 scale\n",
        "        adv_ex = (adv_ex * 255).clamp(0, 255) # 0-255 scale\n",
        "        adv_ex = adv_ex.detach().cpu().data.numpy().round() # round to remove decimal part\n",
        "        adv_ex = adv_ex.transpose((0, 2, 3, 1)) # transpose (bs, C, H, W) back to (bs, H, W, C)\n",
        "        adv_examples = adv_ex if i == 0 else np.r_[adv_examples, adv_ex]\n",
        "    return adv_examples, train_acc / len(loader.dataset), train_loss / len(loader.dataset)\n",
        "\n",
        "# create directory which stores adversarial examples\n",
        "def create_dir(data_dir, adv_dir, adv_examples, adv_names):\n",
        "    if os.path.exists(adv_dir) is not True:\n",
        "        _ = shutil.copytree(data_dir, adv_dir)\n",
        "    for example, name in zip(adv_examples, adv_names):\n",
        "        im = Image.fromarray(example.astype(np.uint8)) # image pixel value should be unsigned int\n",
        "        im.save(os.path.join(adv_dir, name))"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "r_pMkmPytX3k"
      },
      "source": [
        "## Model / Loss Function\n",
        "\n",
        "Model list is available [here](https://github.com/osmr/imgclsmob/blob/master/pytorch/pytorchcv/model_provider.py). Please select models which has _cifar10 suffix. Some of the models cannot be accessed/loaded. You can safely skip them since TA's model will not use those kinds of models."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "jwto8xbPtYzQ"
      },
      "source": [
        "from pytorchcv.model_provider import get_model as ptcv_get_model\n",
        "\n",
        "model = ptcv_get_model('resnet110_cifar10', pretrained=True).to(device)\n",
        "loss_fn = nn.CrossEntropyLoss()\n",
        "\n",
        "benign_acc, benign_loss = epoch_benign(model, adv_loader, loss_fn)\n",
        "print(f'benign_acc = {benign_acc:.5f}, benign_loss = {benign_loss:.5f}')"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "uslb7GPchtMI"
      },
      "source": [
        "## FGSM"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "wQwPTVUIhuTS"
      },
      "source": [
        "adv_examples, fgsm_acc, fgsm_loss = gen_adv_examples(model, adv_loader, fgsm, loss_fn)\n",
        "print(f'fgsm_acc = {fgsm_acc:.5f}, fgsm_loss = {fgsm_loss:.5f}')\n",
        "\n",
        "create_dir(root, 'fgsm', adv_examples, adv_names)"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "WXw6p0A6shZm"
      },
      "source": [
        "## I-FGSM"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "fUEsT06Iskt2"
      },
      "source": [
        "# TODO: iterative fgsm attack\n",
        "# adv_examples, ifgsm_acc, ifgsm_loss = gen_adv_examples(model, adv_loader, ifgsm, loss_fn)\n",
        "# print(f'ifgsm_acc = {ifgsm_acc:.5f}, ifgsm_loss = {ifgsm_loss:.5f}')\n",
        "\n",
        "# create_dir(root, 'ifgsm', adv_examples, adv_names)"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "DQ-nYkkYexEE"
      },
      "source": [
        "## Compress the images"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "ItRo_S0M264N"
      },
      "source": [
        "%cd fgsm\n",
        "!tar zcvf ../fgsm.tgz *\n",
        "%cd ..\n",
        "\n",
        "# %cd ifgsm\n",
        "# !tar zcvf ../ifgsm.tgz *\n",
        "# %cd .."
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "0FM_S886kFd8"
      },
      "source": [
        "## Visualization"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "2FCuE2njkH1O"
      },
      "source": [
        "import matplotlib.pyplot as plt\n",
        "\n",
        "classes = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']\n",
        "\n",
        "plt.figure(figsize=(10, 20))\n",
        "cnt = 0\n",
        "for i, cls_name in enumerate(classes):\n",
        "    path = f'{cls_name}/{cls_name}1.png'\n",
        "    # benign image\n",
        "    cnt += 1\n",
        "    plt.subplot(len(classes), 4, cnt)\n",
        "    im = Image.open(f'./data/{path}')\n",
        "    logit = model(transform(im).unsqueeze(0).to(device))[0]\n",
        "    predict = logit.argmax(-1).item()\n",
        "    prob = logit.softmax(-1)[predict].item()\n",
        "    plt.title(f'benign: {cls_name}1.png\\n{classes[predict]}: {prob:.2%}')\n",
        "    plt.axis('off')\n",
        "    plt.imshow(np.array(im))\n",
        "    # adversarial image\n",
        "    cnt += 1\n",
        "    plt.subplot(len(classes), 4, cnt)\n",
        "    im = Image.open(f'./fgsm/{path}')\n",
        "    logit = model(transform(im).unsqueeze(0).to(device))[0]\n",
        "    predict = logit.argmax(-1).item()\n",
        "    prob = logit.softmax(-1)[predict].item()\n",
        "    plt.title(f'adversarial: {cls_name}1.png\\n{classes[predict]}: {prob:.2%}')\n",
        "    plt.axis('off')\n",
        "    plt.imshow(np.array(im))\n",
        "plt.tight_layout()\n",
        "plt.show()"
      ],
      "execution_count": null,
      "outputs": []
    }
  ]
}